Source code for hysop.numerics.odesolvers.runge_kutta

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np

from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numerics import is_fp
from hysop.numerics.odesolvers.runge_kutta_coeffs import (
    Itype,
    Qtype,
    rk_params,
    available_methods,
)

# default methods to dump rational and integers expression to string


[docs] def Q_dump(q): return f"({q.numerator}/{q.denominator})"
[docs] def I_dump(i): return f"{i}"
[docs] class TimeIntegrator: def __str__(self): return self.name()
[docs] class RungeKutta(TimeIntegrator): pass
# Tni = Tn + alpha_i*dt # Xni = Xn + dt*sum(j=0,i-1,gamma_ij*Knj) # Kni = F(Tni,Xni) # X_n+1 = Xn + dt*sum(i=0,M-1,beta_i*Ki)
[docs] class RhsFunction: def __call__(self, out, X, t, dt, **kwds): msg = "{}.__call__() has not been overrided." msg = msg.format(type(self).__name__) raise NotImplementedError(msg)
[docs] class ExplicitRungeKutta(RungeKutta): def __init__(self, method, I_dump=I_dump, Q_dump=Q_dump): if method not in available_methods(): msg = "{} was not implemented yet! Valid values are {}." msg = msg.format(method, available_methods.keys()) raise ValueError(msg) params = rk_params[method] self.method = method self.order = params["order"] self.stages = params["stages"] self.alpha = params["alpha"] self.beta = params["beta"] self.gamma = params["gamma"] self.I_dump = I_dump self.Q_dump = Q_dump self._hashable = True
[docs] def __call__( self, Xin, RHS, dt, t=0.0, views=None, Xout=None, buffers=None, **kwds ): """Buffers = dict of (nb_stages+1) np.ndarray of size compatible with Xin""" check_instance(Xin, dict, keys=str, values=np.ndarray) varnames = Xin.keys() views = first_not_None(views, {k: Ellipsis for (k, v) in Xin.items()}) Xout = first_not_None( Xout, {k: np.empty_like(v[views[k]]) for (k, v) in Xin.items()} ) buffers = first_not_None( buffers, { k: tuple(np.empty_like(v) for i in range(self.stages + 1)) for (k, v) in Xin.items() }, ) check_instance(views, dict, keys=str, values=(type(Ellipsis), slice, tuple)) check_instance(Xout, dict, keys=str, values=np.ndarray) check_instance(buffers, dict, keys=str, values=tuple) assert callable(RHS), type(RHS) if __debug__: compute_shape = next(iter(Xout.values())).shape assert varnames == Xout.keys() assert varnames == views.keys() assert varnames == buffers.keys() for vname in varnames: ivar = Xin[vname] ovar = Xout[vname] view = views[vname] assert is_fp(ivar.dtype), ivar.dtype assert is_fp(ovar.dtype), ovar.dtype assert np.all( ivar[view].shape == compute_shape ), f"{ivar[view].shape} vs {compute_shape}" assert np.all(ovar.shape == compute_shape), ovar.shape assert len(buffers[vname]) == self.stages + 1, self.stages + 1 for buf in buffers[vname]: assert is_fp(buf.dtype), buf.dtype assert np.all(buf.shape == ivar.shape) Xtmp = {k: v[0] for (k, v) in buffers.items()} K = tuple( {k: v[i] for (k, v) in buffers.items()} for i in range(1, self.stages + 1) ) for i in range(self.stages): ai = self.alpha[i] ti = t + float(ai) * dt if i == 0: RHS(out=K[i], X=Xin, t=ti, step=i, steps=self.stages, **kwds) else: for vname in varnames: Xtmp[vname][...] = 0 for j in range(i): gij = float(self.gamma[i - 1, j]) Xtmp[vname] += float(gij) * K[j][vname] Xtmp[vname] *= dt Xtmp[vname] += Xin[vname] RHS(out=K[i], X=Xtmp, t=ti, step=i, steps=self.stages, **kwds) for vname in varnames: Xout[vname][...] = 0 for i, bi in enumerate(self.beta): Xout[vname] += float(bi) * K[i][vname][views[vname]] Xout[vname] *= dt Xout[vname] += Xin[vname][views[vname]] return Xout
def __eq__(self, other): if not isinstance(other, ExplicitRungeKutta): return NotImplemented eq = self.method == other.method # eq &= self.I_dump == other.I_dump # eq &= self.Q_dump == other.Q_dump return eq def __ne__(self, other): eq = self == other if isinstance(eq, ExplicitRungeKutta): return NotImplemented return not eq def __hash__(self): return hash(self.method)
[docs] def name(self): return self.method
[docs] def dump(self, val): if isinstance(val, Itype): return self.I_dump(val) elif isinstance(val, Qtype): return self.Q_dump(val) else: return f"{val}"
# Tni = Tn + alpha_i*dt
[docs] def Tni(self, i, Tn, dt): alpha = self.alpha[i] if alpha == 0: return f"{Tn}" elif alpha == 1: return f"{Tn} + {dt}" else: return f"{Tn} + {self.dump(alpha)}*{dt}"
# Xni = Xn + dt*sum(j=0,i-1,gamma_ij*Knj)
[docs] def Xni_sum(self, i): _sum = "" gamma = self.gamma[i - 1, :] for j in range(i): g = gamma[j] if g == 0: continue elif g == 1: _sum += f"K[{j}]" else: _sum += f"{self.dump(g)}*K[{j}]" _sum += " + " _sum = _sum[:-3] return _sum
[docs] def Xni(self, i, Xn, dt): if i > 0: _sum = self.Xni_sum(i) return f"{Xn} + {dt}*({_sum})" else: return f"{Xn}"
# X_n+1 = Xn + dt*sum(i=0,M-1,beta_i*Ki)
[docs] def step_sum(self): _sum = "" for i in range(self.stages): beta = self.beta[i] if beta == 0: continue elif beta == 1: _sum += f"K[{i}]" else: _sum += f"{self.dump(beta)}*K[{i}]" _sum += " + " _sum = _sum[:-3] return _sum
[docs] def step(self, Xn, dt): return f"{Xn} + {dt}*({self.step_sum()})"
Euler = ExplicitRungeKutta("Euler") RK1 = ExplicitRungeKutta("RK1") RK2 = ExplicitRungeKutta("RK2") RK3 = ExplicitRungeKutta("RK3") RK4 = ExplicitRungeKutta("RK4") RK4_38 = ExplicitRungeKutta("RK4_38") if __name__ == "__main__": R = ExplicitRungeKutta("RK4") for i in range(4): print(R.Tni(i, Tn="To", dt="dt")) print() for i in range(4): print(R.Xni(i, Xn="Xo", dt="dt")) print() print(R.step(Xn="Xo", dt="dt")) Xin = {"a": np.random.rand(10, 10).astype(np.float32)} Xout = {"a": np.empty_like(Xin["a"])} print("\nRHS=0") class Rhs(RhsFunction): def __call__(self, out, X, t, dt, **kwds): out["a"][...] = 0 rhs = Rhs() for Integrator in (Euler, RK2, RK3, RK4, RK4_38): print(f" {Integrator.name().ljust(8)}", end=" ") dt = 0.1 Integrator(Xin, rhs, dt=dt, Xout=Xout) d = np.max(np.abs(Xout["a"] - Xin["a"])) print(f" d={d}") assert d < 1e-7, d print("\nRHS=1") def rhs(out, X, t, dt, **kwds): out["a"][...] = 1 for Integrator in (Euler, RK2, RK3, RK4, RK4_38): print(f" {Integrator.name().ljust(8)}", end=" ") dt = 0.1 Integrator(Xin, rhs, dt=dt, Xout=Xout) d = np.max(np.abs((Xout["a"] - Xin["a"]) - dt)) print(f" d={d}") assert d < 1e-7, d print("\nRHS=cos(t)") def rhs(out, X, t, dt, **kwds): out["a"][...] = np.cos(t) for Integrator in (Euler, RK2, RK3, RK4, RK4_38): print(f" {Integrator.name().ljust(8)}", end=" ") dt = 0.1 Integrator(Xin, rhs, dt=dt, Xout=Xout) alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32) beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32) dX = np.dot(np.cos(alpha * dt), beta) d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt)) print(f" d={d}") assert d < 1e-7, d print("\nRHS=f(x)") def rhs(out, X, t, dt, **kwds): out["a"][...] = 0.01 * X["a"] + 0.02 for Integrator in (Euler, RK2, RK3, RK4, RK4_38): print(f" {Integrator.name().ljust(8)}", end=" ") dt = 0.1 Integrator(Xin, rhs, dt=dt, Xout=Xout) alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32) beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32) K = [ None, ] * Integrator.stages beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32) for i in range(Integrator.stages): if i == 0: X = Xin["a"].copy() else: X[...] = 0 for j in range(i): gij = float(Integrator.gamma[i - 1, j]) X[...] += float(gij) * K[j] X[...] *= dt X[...] += Xin["a"] K[i] = 0.01 * X + 0.02 dX = np.zeros_like(X) for i, bi in enumerate(beta): dX += bi * K[i] d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt)) print(f" d={d}") assert d < 1e-7, d print("\nRHS=f(x, t)") def rhs(out, X, t, dt, **kwds): out["a"][...] = 0.01 * X["a"] + 2 * t for Integrator in (Euler, RK2, RK3, RK4, RK4_38): print(f" {Integrator.name().ljust(8)}", end=" ") dt = 0.1 Integrator(Xin, rhs, dt=dt, Xout=Xout) alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32) beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32) K = [ None, ] * Integrator.stages alpha0 = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32) beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32) for i in range(Integrator.stages): ti = 0 + alpha[i] * dt if i == 0: X = Xin["a"].copy() else: X[...] = 0 for j in range(i): gij = float(Integrator.gamma[i - 1, j]) X[...] += float(gij) * K[j] X[...] *= dt X[...] += Xin["a"] K[i] = 0.01 * X + 2 * ti dX = np.zeros_like(X) for i, bi in enumerate(beta): dX += bi * K[i] d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt)) print(f" d={d}") assert d < 1e-7, d